Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace websockets with zmq #9173

Merged
merged 1 commit into from
Dec 20, 2024
Merged

Replace websockets with zmq #9173

merged 1 commit into from
Dec 20, 2024

Conversation

xjules
Copy link
Contributor

@xjules xjules commented Nov 11, 2024

Issue

  • POC: testing feasibility of zmq.

  • Refactor tests

Approach
Implementing router-dealer pattern with custom acknowledgments with zmq

  • Server / router (ensemble evaluator) gets messages and keeps track of connected clients wherein in additional it sends ack message to dealaers to confirm that the message has been received .

  • Dealer(s) = clients and dispatchers are sending messages to the router via:

send_multipart([b"", message.encode("utf-8")])
  • dealers always wait for acknowledgment from the evaluator

  • removing websockets, no more wait_for_evaluator

  • Settup encryption with curve

  • each dealer (client, dispatcher) will get a unique name

  • Make sure to check cancellation error when sending event from client

  • Monitor is an advanced version Client

  • Use TCP protocol only when using LSF, SLURM or TORQUE queues

  • PR title captures the intent of the changes, and is fitting for release notes.

  • Added appropriate release note label

  • Commit history is consistent and clean, in line with the contribution guidelines.

  • Make sure unit tests pass locally after every commit (git rebase -i main --exec 'pytest tests/ert/unit_tests -n logical -m "not integration_test"')

When applicable

  • When there are user facing changes: Updated documentation
  • New behavior or changes to existing untested code: Ensured that unit tests are added (See Ground Rules).
  • Large PR: Prepare changes in small commits for more convenient review
  • Bug fix: Add regression test for the bug
  • Bug fix: Create Backport PR to latest release

@xjules xjules self-assigned this Nov 11, 2024
@xjules xjules force-pushed the test_zmq branch 3 times, most recently from feac786 to 5acae39 Compare November 19, 2024 14:58
@xjules xjules marked this pull request as ready for review November 19, 2024 14:59
@xjules xjules force-pushed the test_zmq branch 5 times, most recently from e553dbc to 016b43d Compare November 21, 2024 13:25
@xjules xjules force-pushed the test_zmq branch 4 times, most recently from ca1e52c to 0884d51 Compare December 6, 2024 13:09
@xjules xjules force-pushed the test_zmq branch 4 times, most recently from e3d6fa0 to 8b9a9ab Compare December 10, 2024 14:10
@codecov-commenter
Copy link

codecov-commenter commented Dec 10, 2024

Codecov Report

Attention: Patch coverage is 88.84892% with 31 lines in your changes missing coverage. Please review.

Project coverage is 91.71%. Comparing base (2a4b6be) to head (22b86ee).
Report is 23 commits behind head on main.

Files with missing lines Patch % Lines
src/_ert/forward_model_runner/client.py 85.39% 13 Missing ⚠️
src/ert/ensemble_evaluator/evaluator.py 87.12% 13 Missing ⚠️
src/_ert/forward_model_runner/reporting/event.py 91.42% 3 Missing ⚠️
src/ert/ensemble_evaluator/_ensemble.py 75.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #9173      +/-   ##
==========================================
- Coverage   91.86%   91.71%   -0.15%     
==========================================
  Files         433      430       -3     
  Lines       26780    26684      -96     
==========================================
- Hits        24602    24474     -128     
- Misses       2178     2210      +32     
Flag Coverage Δ
cli-tests 39.45% <65.56%> (-0.30%) ⬇️
everest-models-test 34.19% <74.82%> (-0.38%) ⬇️
gui-tests 71.75% <78.02%> (-0.37%) ⬇️
integration-test 38.34% <76.61%> (+1.13%) ⬆️
performance-tests 51.74% <78.75%> (-0.21%) ⬇️
test 39.63% <76.61%> (-1.05%) ⬇️
unit-tests 74.03% <86.33%> (-0.14%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@jonathan-eq
Copy link
Contributor

All tests passing 😻


def __enter__(self) -> Self:
self.loop.run_until_complete(self.__aenter__())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can remove all of the synced wrappers of the async methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe. I think the synced version if now only used in tests.

Copy link
Contributor Author

@xjules xjules Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made some exploration and the issue that the reporter tests are still fully synced and thus we need to preserve both (async and sync) versions alive. Actually it's only the zmq socket that needs to be synced. Yes, this can go away!

Comment on lines 45 to 60
async def __aexit__(
self, exc_type: Any, exc_value: Any, exc_traceback: Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this in the method signature?
I see that we decorate the equivalent method in EnsembleEvaluator with contextlib.asynccontextmanager. Maybe we should do the same here if it is not used.

async def _term_receiver_task(self) -> None:
if self._receiver_task and not self._receiver_task.done():
self._receiver_task.cancel()
await asyncio.gather(self._receiver_task, return_exceptions=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to gather this cancelled task, or can we await it (and suppress asyncio.cancellationerror if it is raised)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we capture asyncio.CancelledError we can just do await. :+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the CancelledError is not catched there, so I would keep it.

return self

def term(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rearrange the order of the methods, it is a bit difficult to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do 👍

while True:
try:
if self._done.is_set() and start_time is None:
start_time = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is start_time? Is that the start of closing the event_publisher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the start_time of closing event_publisher. Maybe I can rename it start_closing_time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good

@@ -82,14 +65,20 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig):
self._max_batch_size: int = 500
self._batching_interval: float = 2.0
self._complete_batch: asyncio.Event = asyncio.Event()
self._server_started: asyncio.Event = asyncio.Event()
self._clients_connected: set[bytes] = set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think self._connected_clients would be a better name. clients_connected sounds more like a boolean

Copy link
Contributor

@jonathan-eq jonathan-eq Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what is client? Is it the Client we use for sending/receiving, or is it referring to the monitor for UI?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rename it to monitors

Copy link
Contributor Author

@xjules xjules Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I was trying to preserve the naming conventions we had before (which was clients), but maybe having it explicit as Monitor would make more sense. Then I would change handle_client -> handle_monitor as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename it now that we are already giving it an overhaul, or is that a separate PR? If not, let's change it to connected_clients

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do it in another PR to keep the semantics as it was before.

Comment on lines +314 to +317
self._router_socket.close()
zmq_context.destroy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._router_socket.close()
zmq_context.destroy()
zmq_context.destroy()

The destroy method should close all associated sockets automatically

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it for the time being as to be explicit.

Comment on lines +478 to +417
if sys.platform != "linux":
kwargs["use_ipc_protocol"] = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the tests running on the GH Actions ubuntu runner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is meant only for MacOS GH Action runners.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we not use ipc on the ubuntu runners? Maybe that can speed up some tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we should test tcp sockets too. Hm, let's see.

Copy link

codspeed-hq bot commented Dec 16, 2024

CodSpeed Performance Report

Merging #9173 will not alter performance

Comparing xjules:test_zmq (22b86ee) with main (1bd4fe1)

Summary

✅ 24 untouched benchmarks

@jonathan-eq
Copy link
Contributor

@xjules Seems codspeed is very impressed by your changes 🤣

@xjules xjules force-pushed the test_zmq branch 4 times, most recently from b965bf4 to dec2341 Compare December 19, 2024 14:15
Comment on lines 108 to 114
async def process_message(self, msg: str) -> None:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw on a @abstractmethod?
It is defined in monitor.py which inherits from Client

src/ert/ensemble_evaluator/evaluator.py Show resolved Hide resolved
Comment on lines +111 to +117
def signal(self, value):
self.value = value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def signal(self, value):
self.value = value
def signal(self, should_stop: bool):
self.should_stop = should_stop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the signal(value) has more meaning than just "stop". Will put a comment there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment:

"""Mock ZMQ server for testing
        signal = 0: normal operation
        signal = 1: don't send ACK and don't receive messages
        signal = 2: don't send ACK, but receive messages
        """

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good!

while True:
try:
if self._done.is_set() and start_time is None:
start_time = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good

src/ert/ensemble_evaluator/evaluator.py Show resolved Hide resolved
Comment on lines +478 to +417
if sys.platform != "linux":
kwargs["use_ipc_protocol"] = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we not use ipc on the ubuntu runners? Maybe that can speed up some tests

tests/ert/unit_tests/ensemble_evaluator/test_monitor.py Outdated Show resolved Hide resolved
tests/ert/unit_tests/shared/test_port_handler.py Outdated Show resolved Hide resolved
tests/ert/utils.py Show resolved Hide resolved
@xjules xjules force-pushed the test_zmq branch 2 times, most recently from 1318cf5 to 0f1bb30 Compare December 20, 2024 13:19
Copy link
Contributor

@jonathan-eq jonathan-eq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good, let's start the new year without websockets 🥇

 - dealers always wait for acknowledgment from the evaluator
 - removing websockets and wait_for_evaluator
 - Settup encryption with curve
 - each dealer (client, dispatcher) will get a unique name
 - Monitor is an advanced version Client
 - _server_started.wait() is to signal that zmq router socket is bound
 - Use TCP protocol only when using LSF, SLURM or TORQUE queues
 -- Use ipc_protocol when using LOCAL driver
 - Remove certificate
 - Remove synced _send from Client
 - Remove cert generator
 - Remove ClientConnectionClosedOK
 - Add test for new connection while closing down evaluator
 - Add test for handle dispatcher and dispatcher messages in evaluator
 - Add tests for ipc and tcp ee config
 - Add test for clear connect and disconnect of Monitor
 - Set a a correct protocol for everestserver
@xjules xjules merged commit 9b1a01e into equinor:main Dec 20, 2024
41 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

5 participants